package com.zengqingfa.rocketmq.transaction;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

/**
 *
 * @fileName: TransactionListenerImpl
 * @author: zengqf3
 * @date: 2021-4-2 16:21
 * @description:
 */
public class TransactionListenerImpl implements TransactionListener {

    /**
     * 存储对应事务的状态信息，key：事务ID，value：当前事务执行的状态
     */
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();

    /**
     * 执行本地事务
     * @param message
     * @param o
     * @return
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {

        // 事务ID
        String transactionId = message.getTransactionId();

        // 0：执行中，状态未知，1：本地事务执行成功，2：本地事务执行失败
        localTrans.put(transactionId, 0);

        // 业务执行，处理本地事务
        System.out.println("Hello-----Transaction");

        try {

            System.out.println("正在执行本地事务");
            Thread.sleep(2000);
            System.out.println("本地事务执行成功");

            localTrans.put(transactionId, 1);
        } catch (InterruptedException e) {
            e.printStackTrace();
            localTrans.put(transactionId, 2);
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.COMMIT_MESSAGE;
    }

    /**
     * 消息回查
     * @param messageExt
     * @return
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {

        // 获取事务ID
        String transactionId = messageExt.getTransactionId();

        // 获取对应事务ID的执行状态
        Integer status = localTrans.get(transactionId);

        System.out.println("消息回查--TransactionId：" + transactionId + ", 状态：" + status);

        switch (status) {
            case 0:
                return LocalTransactionState.UNKNOW;
            case 1:
                return LocalTransactionState.COMMIT_MESSAGE;
            case 2:
                return LocalTransactionState.ROLLBACK_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;
    }
}
